#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from subprocess import PIPE, STDOUT, Popen
from tempfile import NamedTemporaryFile, TemporaryDirectory, gettempdir
from typing import Sequence

from airflow.exceptions import AirflowFailException
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context


class BashSensor(BaseSensorOperator):
    """
    Executes a bash command/script.

    Return True if and only if the return code is 0.

    :param bash_command: The command, set of commands or reference to a
        bash script (must be '.sh') to be executed.

    :param env: If env is not None, it must be a mapping that defines the
        environment variables for the new process; these are used instead
        of inheriting the current process environment, which is the default
        behavior. (templated)
    :param output_encoding: output encoding of bash command.
    :param retry_exit_code: If task exits with this code, treat the sensor
        as not-yet-complete and retry the check later according to the
        usual retry/timeout settings. Any other non-zero return code will
        be treated as an error, and cause the sensor to fail. If set to
        ``None`` (the default), any non-zero exit code will cause a retry
        and the task will never raise an error except on time-out.

    .. seealso::
        For more information on how to use this sensor,take a look at the guide:
        :ref:`howto/operator:BashSensor`
    """

    template_fields: Sequence[str] = ("bash_command", "env")

    def __init__(
        self, *, bash_command, env=None, output_encoding="utf-8", retry_exit_code: int | None = None, **kwargs
    ):
        super().__init__(**kwargs)
        self.bash_command = bash_command
        self.env = env
        self.output_encoding = output_encoding
        self.retry_exit_code = retry_exit_code

    def poke(self, context: Context):
        """Execute the bash command in a temporary directory."""
        bash_command = self.bash_command
        self.log.info("Tmp dir root location: \n %s", gettempdir())
        with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
                f.write(bytes(bash_command, "utf_8"))
                f.flush()
                fname = f.name
                script_location = tmp_dir + "/" + fname
                self.log.info("Temporary script location: %s", script_location)
                self.log.info("Running command: %s", bash_command)

                with Popen(
                    ["bash", fname],
                    stdout=PIPE,
                    stderr=STDOUT,
                    close_fds=True,
                    cwd=tmp_dir,
                    env=self.env,
                    preexec_fn=os.setsid,
                ) as resp:
                    if resp.stdout:
                        self.log.info("Output:")
                        for line in iter(resp.stdout.readline, b""):
                            self.log.info(line.decode(self.output_encoding).strip())
                    resp.wait()
                    self.log.info("Command exited with return code %s", resp.returncode)

                    # zero code means success, the sensor can go green
                    if resp.returncode == 0:
                        return True

                    # we have a retry exit code, sensor retries if return code matches, otherwise error
                    elif self.retry_exit_code is not None:
                        if resp.returncode == self.retry_exit_code:
                            self.log.info("Return code matches retry code, will retry later")
                            return False
                        else:
                            raise AirflowFailException(f"Command exited with return code {resp.returncode}")

                    # backwards compatibility: sensor retries no matter the error code
                    else:
                        self.log.info("Non-zero return code and no retry code set, will retry later")
                        return False
